dbt-athenaでHIVEテーブルに一度に100を超えるパーティションの書き込みができるか確認してみた
データアナリティクス事業本部 機械学習チームの鈴木です。
dbt-athenaでHIVEテーブルに一度に100を超えるパーティションの書き込みができるか確認してみました。
この記事について
試した内容について
Athenaはパーティション分割されたテーブルに対してINSERT INTOおよびCTASでデータ作成する際に、一度に100個を超えるパーティションにデータを作成することはできないのでした。
100個を超えるパーティションにデータを作成したい場合、以下のガイドのように100個以下のパーティションが対象になるようにフィルタリングし、繰り返しデータ作成をすることになります。
Athenaではパーティションキーは100個以内の限られたカーディナリティのものを使うことになりますが、実際のところ年月日のような値はパーティションキーによく使われているので、このような設計にしている場合、100個くらいはあっという間に超えてしまいます。このとき過去のデータを作り直す機能としては、例えば先に記載したようにforなどを使って100個ずつAthenaで実行し直すというような実装をする必要がありました。
ところで、dbt-athenaを使っている場合、特にIncremental modelsなどではパーティション分割していることも多いかと思いますが、データの作り直しをする際にdbt run --full-refresh
を実行すると100個の制限にかかって失敗してしまうとかはないのかなと疑問に思ったので、試してみた内容をご紹介します。
結果としては、その制限にかかってモデル作成が失敗することはなく、dbt-athena側で100個に収まるようにパーティションを分けてデータ作成してくれることが分かり、とても使いやすいなと思いました。
環境
- dbt: 1.6.6
- dbt-athena-community: 1.6.4
- Python: 3.9.6
- Athena エンジンバージョン 3
検証の準備
データの作成
以下のように2つのカラムを持つCSVファイルを手作りしました。partiton_key
カラムは名前通り、パーティショニングするときのパーティションキーにします。
partiton_key,item_value 1,sample 2,sample 3,sample 4,sample 5,sample 6,sample 7,sample 8,sample 9,sample 10,sample
上のようなCSVファイルを、以下の2パターン作成しました。
partiton_key
が100個のCSVファイルpartiton_key
が101個のCSVファイル
今回100個の制限に引っかかるか確認するなら、No.2のCSVファイルでパーティションを作成した際にエラーになるかどうかを見れば良さそうですね。
上記のファイルをS3にアップロードし、以下のGlueテーブルから検索できるようにしておきました。
CREATE EXTERNAL TABLE `partitioning_sample_table`( `partition_key` string, `item_value` string COMMENT 'from deserializer') ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://データをアップロードしたS3バケット名/データをアップロードしたパス' TBLPROPERTIES ( 'classification'='csv', 'columnsOrdered'='false', 'compressionType'='none', 'delimiter'=',', 'skip.header.line.count'='1', 'transient_lastDdlTime'='1697863217')
dbtのモデルの準備
以下のようにモデルを作成しました。partition_key
カラムでパーティション分割したモデルです。
{{ config( materialized='table', partitioned_by=['partition_key'], ) }} SELECT item_value, partition_key FROM {{ source('sample','partitioning_sample_table') }}
ソーステーブルは以下のように定義しておきました。
version: 2 sources: - name: sample database: awsdatacatalog schema: cm-nayuts-dbt-athena tables: - name: partitioning_sample_table
モデルを作成する
100個までのとき
No.1のデータをS3バケットに配置してからdbt run
でモデルを実行し、AthenaでどのようなSQLが実行されるかを確認しました。
100個までのときはシンプルで、CTASでstg_partitioning_sample_model
を作成していました。
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with ( table_type='hive', is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model/215c9454-40c2-403f-9424-73bf5991200f', partitioned_by=ARRAY['partition_key'], format='parquet' ) as SELECT item_value, partition_key FROM "awsdatacatalog"."cm-nayuts-dbt-athena"."partitioning_sample_table"
alter table `cm-nayuts-dbt-athena`.`stg_partitioning_sample_model` set tblproperties ('classification' = 'parquet')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ SELECT '{"rowcount":100,"data_scanned_in_bytes":1115}'
101個までのとき
次に、No.2のデータをS3バケットに配置してからdbt run
でモデルを実行し、AthenaでどのようなSQLが実行されるかを確認しました。
こちらはまずHIVE_TOO_MANY_OPEN_PARTITIONS: Exceeded limit of 100 open writers for partitions/buckets.
エラーでSQLが失敗しました。
以降は一時用のテーブルを作成し、stg_partitioning_sample_model
を100を超えないようにパーティションをWHERE句で選択してCTASで作成し、溢れた分はINSERT INTOで格納していました。
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" with ( table_type='hive', is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model__tmp_not_partitioned/b2e1c475-47a7-45f6-9364-7edec9d3801b', format='parquet' ) as SELECT item_value, partition_key FROM "awsdatacatalog"."cm-nayuts-dbt-athena"."partitioning_sample_table"
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ select distinct partition_key from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" order by partition_key
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with ( table_type='hive', is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model/a74f65db-69fa-45e6-a18e-ae69ae0b7e84', partitioned_by=ARRAY['partition_key'], format='parquet' ) as select "item_value", "partition_key" from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" where (partition_key='1') or (partition_key='10') or (partition_key='100') or (partition_key='101') or (partition_key='11') or (partition_key='12') or (partition_key='13') or (partition_key='14') or (partition_key='15') or (partition_key='16') or (partition_key='17') or (partition_key='18') or (partition_key='19') or (partition_key='2') or (partition_key='20') or (partition_key='21') or (partition_key='22') or (partition_key='23') or (partition_key='24') or (partition_key='25') or (partition_key='26') or (partition_key='27') or (partition_key='28') or (partition_key='29') or (partition_key='3') or (partition_key='30') or (partition_key='31') or (partition_key='32') or (partition_key='33') or (partition_key='34') or (partition_key='35') or (partition_key='36') or (partition_key='37') or (partition_key='38') or (partition_key='39') or (partition_key='4') or (partition_key='40') or (partition_key='41') or (partition_key='42') or (partition_key='43') or (partition_key='44') or (partition_key='45') or (partition_key='46') or (partition_key='47') or (partition_key='48') or (partition_key='49') or (partition_key='5') or (partition_key='50') or (partition_key='51') or (partition_key='52') or (partition_key='53') or (partition_key='54') or (partition_key='55') or (partition_key='56') or (partition_key='57') or (partition_key='58') or (partition_key='59') or (partition_key='6') or (partition_key='60') or (partition_key='61') or (partition_key='62') or (partition_key='63') or (partition_key='64') or (partition_key='65') or (partition_key='66') or (partition_key='67') or (partition_key='68') or (partition_key='69') or (partition_key='7') or (partition_key='70') or (partition_key='71') or (partition_key='72') or (partition_key='73') or (partition_key='74') or (partition_key='75') or (partition_key='76') or (partition_key='77') or (partition_key='78') or (partition_key='79') or (partition_key='8') or (partition_key='80') or (partition_key='81') or (partition_key='82') or (partition_key='83') or (partition_key='84') or (partition_key='85') or (partition_key='86') or (partition_key='87') or (partition_key='88') or (partition_key='89') or (partition_key='9') or (partition_key='90') or (partition_key='91') or (partition_key='92') or (partition_key='93') or (partition_key='94') or (partition_key='95') or (partition_key='96') or (partition_key='97') or (partition_key='98')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ insert into "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" ("item_value", "partition_key") select "item_value", "partition_key" from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" where (partition_key='99')
alter table `cm-nayuts-dbt-athena`.`stg_partitioning_sample_model` set tblproperties ('classification' = 'parquet')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ SELECT '"awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with many partitions created'
厳密にはdbt-athena-communityの実装を確認すべきですが、パーティション数超過のエラーを受け取るとCTASとINSERT INTOを組み合わせてデータ格納をするロジックに切り替えてくれるようですね。
Incremental modelsの場合
Incremental modelsの場合でも同様に実行できるか確認しました。
まず、モデルの定義を以下のように変更しました。
{{ config( materialized='incremental', partitioned_by=['partition_key'], ) }} SELECT item_value, partition_key FROM {{ source('sample','partitioning_sample_table') }} {% if is_incremental() %} where partitioned_by > (select max(partitioned_by) from {{ this }}) {% endif %}
次に、No.2のファイルをS3バケットに配置した状態で、dbt run --full-refresh
を実行し、以下の順にSQLが実行されることを確認しました。SQLの内容としては、微妙に違いはあるものの、ロジックとしては先に確認したものと同じでした。
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" with ( table_type='hive', is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model__tmp_not_partitioned/59ea3001-4893-4a65-8ceb-84c238a3cc43', format='parquet' ) as SELECT item_value, partition_key FROM "awsdatacatalog"."cm-nayuts-dbt-athena"."partitioning_sample_table"
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ select distinct partition_key from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" order by partition_key
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ create table "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with ( table_type='hive', is_external=true,external_location='s3://profileに設定したS3バケット/dbt-athena-data/cm-nayuts-dbt-athena/stg_partitioning_sample_model/3eca27c2-6a2a-46ae-bd90-a9fd48f3f78e', partitioned_by=ARRAY['partition_key'], format='parquet' ) as select "item_value", "partition_key" from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" where (partition_key='1') or (partition_key='10') or (partition_key='100') or (partition_key='101') or (partition_key='11') or (partition_key='12') or (partition_key='13') or (partition_key='14') or (partition_key='15') or (partition_key='16') or (partition_key='17') or (partition_key='18') or (partition_key='19') or (partition_key='2') or (partition_key='20') or (partition_key='21') or (partition_key='22') or (partition_key='23') or (partition_key='24') or (partition_key='25') or (partition_key='26') or (partition_key='27') or (partition_key='28') or (partition_key='29') or (partition_key='3') or (partition_key='30') or (partition_key='31') or (partition_key='32') or (partition_key='33') or (partition_key='34') or (partition_key='35') or (partition_key='36') or (partition_key='37') or (partition_key='38') or (partition_key='39') or (partition_key='4') or (partition_key='40') or (partition_key='41') or (partition_key='42') or (partition_key='43') or (partition_key='44') or (partition_key='45') or (partition_key='46') or (partition_key='47') or (partition_key='48') or (partition_key='49') or (partition_key='5') or (partition_key='50') or (partition_key='51') or (partition_key='52') or (partition_key='53') or (partition_key='54') or (partition_key='55') or (partition_key='56') or (partition_key='57') or (partition_key='58') or (partition_key='59') or (partition_key='6') or (partition_key='60') or (partition_key='61') or (partition_key='62') or (partition_key='63') or (partition_key='64') or (partition_key='65') or (partition_key='66') or (partition_key='67') or (partition_key='68') or (partition_key='69') or (partition_key='7') or (partition_key='70') or (partition_key='71') or (partition_key='72') or (partition_key='73') or (partition_key='74') or (partition_key='75') or (partition_key='76') or (partition_key='77') or (partition_key='78') or (partition_key='79') or (partition_key='8') or (partition_key='80') or (partition_key='81') or (partition_key='82') or (partition_key='83') or (partition_key='84') or (partition_key='85') or (partition_key='86') or (partition_key='87') or (partition_key='88') or (partition_key='89') or (partition_key='9') or (partition_key='90') or (partition_key='91') or (partition_key='92') or (partition_key='93') or (partition_key='94') or (partition_key='95') or (partition_key='96') or (partition_key='97') or (partition_key='98')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ insert into "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" ("item_value", "partition_key") select "item_value", "partition_key" from "awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model__tmp_not_partitioned" where (partition_key='99')
-- /* {"app": "dbt", "dbt_version": "1.6.6", "profile_name": "test_project", "target_name": "dev", "node_id": "model.test_project.stg_partitioning_sample_model"} */ select '"awsdatacatalog"."cm-nayuts-dbt-athena"."stg_partitioning_sample_model" with many partitions created'
alter table `cm-nayuts-dbt-athena`.`stg_partitioning_sample_model` set tblproperties ('classification' = 'parquet')
終わりに
dbt-athenaでHIVEテーブルに一度に100を超えるパーティションの書き込みができるか確認してみました。
パーティション分割している際に、過去データの作り直しで悩むことがありましたが、dbt-athenaでモデル作成しているときは考えなくてもdbtがよしなにやってくることが分かり安心しました。
補足
パーティション数の制限について
今回の検証では100個を超えてもdbt-athenaがよしなに対処してくれることが分かりましたが、パーティション数が非常に大きい場合はパーティションキーの設計自体を考えた方が良いです。
例えばパーティションにデータを作成すると、S3にオブジェクトを作成するためのPUTリクエストが行われるため、東京リージョンだと0.0047USD/1000リクエストの費用がかかります。一見は安く感じますが、細かくパーティションを分けているとオブジェクト作成時やその後のファイル取得時などに思いがけず費用がかかってしまう可能性があります。
100個の制限を設けてユーザーが意識的にパーティションを作りすぎないようになっているのは大事なことだと思います。そもそも本来は非常に高いパーティションキーでパーティションを分けることはせず、Bucketingと組み合わせたり、Icebergテーブルにしてhidden partitioningのようなパーティションキーのカーディナリティを低くするような仕組みを利用したりすると良いように思います。